package app.dwd;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import util.GmallSourceUtil;

public abstract class BaseAppSQL {
    public void init(int port, int parallelism, String ck) {
        System.setProperty("HADOOP_USER_NAME", "atguigu");
        Configuration conf = new Configuration();
        conf.setInteger("rest.port", port);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.setParallelism(parallelism);

        //1.设置开启ck,模式为精准一次
        env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
        //2. Checkpoint必须在一分钟内完成，否则就会被抛弃
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        // 3.开启在 job 中止后仍然保留的 externalized checkpoints
        env
            .getCheckpointConfig()
            .enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        // 4. 设置状态后端
        env.setStateBackend(new HashMapStateBackend());
        env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop162:8020/gmall-flink-follow/" + ck);

        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
        // 业务代码抽象出来给子类去实现

        run(tEnv);

        try {
            env.execute(ck);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    protected abstract void run(StreamTableEnvironment tEnv);


}
